/**

Copyright (C) SYSTAP, LLC DBA Blazegraph 2006-2016.  All rights reserved.

Contact:
     SYSTAP, LLC DBA Blazegraph
     2501 Calvert ST NW #106
     Washington, DC 20008
     licenses@blazegraph.com

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/
/*
 * Created on Mar 15, 2007
 */

package com.bigdata.service;

import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

import com.bigdata.btree.ITuple;
import com.bigdata.btree.ITupleIterator;
import com.bigdata.config.LongValidator;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.ICounterSetAccess;
import com.bigdata.counters.Instrument;
import com.bigdata.ha.HAStatusEnum;
import com.bigdata.journal.ITransactionService;
import com.bigdata.journal.ITx;
import com.bigdata.journal.Journal;
import com.bigdata.journal.RunState;
import com.bigdata.journal.TimestampUtility;
import com.bigdata.journal.ValidationError;
import com.bigdata.resources.ResourceManager;
import com.bigdata.util.InnerCause;
import com.bigdata.util.MillisecondTimestampFactory;

/**
 * Centralized transaction manager service. In response to a client request, the
 * transaction manager will distribute prepare/commit or abort operations to all
 * data services on which writes were made by a transaction. The transaction
 * manager also provides global timestamps required for non-transactional commit
 * points and various other purposes.
 * 
 * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
 * @version $Id$
 * 
 * @todo failover. the service instances will need to track active/committed
 *       transactions, complain if their clocks get out of alignment, and refuse
 *       to generate a timestamp that would go backwards when compared to the
 *       timestamp generated by the last master service.
 */
abstract public class AbstractTransactionService extends AbstractService
        implements ITransactionService, IServiceShutdown, ICounterSetAccess {

    /**
     * Logger.
     */
    protected static final Logger log = Logger.getLogger(AbstractTransactionService.class);

//    protected static final boolean INFO = log.isInfoEnabled();

//    protected static final boolean DEBUG = log.isDebugEnabled();
    
    /**
     * Options understood by this service.
     * 
     * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
     * @version $Id$
     */
    public interface Options {

        /**
         * How long you want to hold onto the database history (in milliseconds)
         * or {@link Long#MAX_VALUE} for an (effectively) immortal database. The
         * {@link ITransactionService} tracks the timestamp corresponding to the
         * earliest running transaction (if any). When such a transaction
         * exists, the actual release time is:
         * 
         * <pre>
         * releaseTime = min(lastCommitTime - 1, min(earliestRunningTx, now - minimumReleaseAge))
         * </pre>
         * 
         * This ensures that history in use by running transactions is not
         * released even when the minimumReleaseAge is ZERO (0).
         * <p>
         * When no transactions exist the actual release time is:
         * 
         * <pre>
         * releaseTime = min(commitTime - 1, now - minimumReleaseAge)
         * </pre>
         * 
         * This ensures that the the release time advances when no transactions
         * are in use, but that the minimum release age is still respected.
         * 
         * @see #DEFAULT_MIN_RELEASE_AGE
         * @see #MIN_RELEASE_AGE_1H
         * @see #MIN_RELEASE_AGE_1D
         * @see #MIN_RELEASE_AGE_1W
         * @see #MIN_RELEASE_AGE_NEVER
         * 
         * @see AbstractTransactionService#updateReleaseTime(long)
         * @see AbstractTransactionService#notifyCommit(long)
         */
        String MIN_RELEASE_AGE = AbstractTransactionService.class.getName()
                + ".minReleaseAge";

        /**
         * Minimum release age is zero (0). A value of ZERO (0) implies that any
         * history not required for the read-committed view is released each
         * time the {@link ResourceManager} overflows.
         */
        String MIN_RELEASE_AGE_NO_HISTORY = "0";

        /** Minimum release age is one minutes. */
        String MIN_RELEASE_AGE_1M = "" + 1/* mn */* 60/* sec */* 1000/* ms */;

        /** Minimum release age is five minutes. */
        String MIN_RELEASE_AGE_5M = "" + 5/* mn */* 60/* sec */* 1000/* ms */;

        /** Minimum release age is one hour. */
        String MIN_RELEASE_AGE_1H = "" + 1/* hr */* 60/* mn */* 60/* sec */
                * 1000/* ms */;

        /** Minimum release age is one day. */
        String MIN_RELEASE_AGE_1D = "" + 24/* hr */* 60/* mn */* 60/* sec */
                * 1000/* ms */;

        /** Minimum release age is one week. */
        String MIN_RELEASE_AGE_1W = "" + 7/* d */* 24/* hr */* 60/* mn */
                * 60/* sec */
                * 1000/* ms */;

        /** Immortal database (the release time is set to {@link Long#MAX_VALUE}). */
        String MIN_RELEASE_AGE_NEVER = "" + Long.MAX_VALUE;

        /**
         * Default minimum release age is ONE(1L) milliseconds (only the last
         * commit point will be retained after a full compacting merge). This
         * causes the RWStore to use its recycler mode by default rather than
         * its session protection mode.
         * 
         * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/638"
         *      >Change DEFAULT_MIN_RELEASE_AGE to 1ms</a>
         */
        String DEFAULT_MIN_RELEASE_AGE = "1";
//        String DEFAULT_MIN_RELEASE_AGE = MIN_RELEASE_AGE_NO_HISTORY;

    }
    
    /**
     * If the transaction is read-only and a write operation was requested.
     */
    protected static final transient String ERR_READ_ONLY = "Read-only";
    
    /**
     * If the transaction is not known to this service.
     */
    protected static final transient String ERR_NO_SUCH = "Unknown transaction";
    
    /**
     * If a transaction is no longer active.
     */
    protected static final transient String ERR_NOT_ACTIVE = "Not active";

    /**
     * If the transaction service is not in a run state which permits the
     * requested operation.
     */
    protected static final transient String ERR_SERVICE_NOT_AVAIL = "Service not available";

    /**
     * The run state for the transaction service.
     */
    private volatile TxServiceRunState runState;

    /**
     * A copy of the callers properties. 
     */
    private final Properties properties;
    
    /**
     * The minimum age in milliseconds before history may be released.
     * 
     * @see Options#MIN_RELEASE_AGE
     */
    final private long minReleaseAge;
    
    /**
     * An object wrapping the properties used to initialize the service.
     */
    protected Properties getProperties() {
        
        return new Properties(properties);
        
    }
    
    /**
     * A hash map containing all active transactions. A transaction that is
     * preparing will remain in this collection until it has completed (aborted
     * or committed). The key is the txId of the transaction. 
     * 
     * @todo config param for the initial capacity of the map.
     * @todo config for the concurrency rating of the map.
     */
    final private ConcurrentHashMap<Long, TxState> activeTx = new ConcurrentHashMap<Long, TxState>();

    /**
     * Return the {@link TxState} associated with the specified transition
     * identifier.
     * <p>
     * Note: This method is an internal API. The caller must adhere to the
     * internal synchronization APIs for the transaction service.
     * 
     * @param tx
     *            The transaction identifier (the signed value, NOT the absolute
     *            value).
     * 
     * @return The {@link TxState} -or- <code>null</code> if there is no such
     *         active transaction.
     */
    protected TxState getTxState(final long tx) {

        return activeTx.get(tx);

    }
    
    /**
     * The #of open transactions in any {@link RunState}.
     */
    final public int getActiveCount() {
        
        return activeTx.size();
        
    }
    
    public AbstractTransactionService(final Properties properties) {
        
        this.properties = (Properties) properties.clone();

        {
            
            this.minReleaseAge = LongValidator.GTE_ZERO.parse(
                    Options.MIN_RELEASE_AGE, properties.getProperty(
                            Options.MIN_RELEASE_AGE,
                            Options.DEFAULT_MIN_RELEASE_AGE));

            if (log.isInfoEnabled())
                log.info(Options.MIN_RELEASE_AGE + "=" + minReleaseAge);
        
        }
        
        runState = TxServiceRunState.Starting;
        
    }

    /**
     * Any state other than {@link TxServiceRunState#Halted}.
     */
    @Override
    public boolean isOpen() {

        return runState != TxServiceRunState.Halted;
        
    }
    
    protected void assertOpen() {

        if (!isOpen())
            throw new IllegalStateException();

    }

    /**
     * Return the {@link RunState}.
     */
    public TxServiceRunState getRunState() {
        
        if(!lock.isHeldByCurrentThread())
            throw new IllegalMonitorStateException();
        
        return runState;
        
    }
    
    /**
     * Change the {@link TxServiceRunState}.
     * 
     * @param newval
     *            The new value.
     * 
     * @throws IllegalStateException
     *             if the requested state is not a legal state change.
     */
    synchronized protected void setRunState(final TxServiceRunState newval) {

        if(!lock.isHeldByCurrentThread())
            throw new IllegalMonitorStateException();
        
        if (!runState.isTransitionLegal(newval)) {

            throw new IllegalStateException("runState=" + runState
                    + ", but newval=" + newval);

        }

        this.runState = newval;

        if (log.isInfoEnabled()) {

            log.info("runState=" + runState);
            
        }
        
    }
    
    /**
     * Polite shutdown. New transactions will not start. This method will block
     * until existing transactions (both read-write and read-only) are complete
     * (either aborted or committed).
     */
    @Override
    public void shutdown() {

        if(log.isInfoEnabled()) 
            log.info("");

        lock.lock();
        try {

            switch (getRunState()) {
            case Shutdown:
            case ShutdownNow:
            case Halted:
                return;
            }

            // Do not allow new transactions to start.
            setRunState(TxServiceRunState.Shutdown);

            try {

                // wait for running transactions to complete.
                awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS);

            } catch (InterruptedException ex) {

                // convert to fast shutdown.

                log.warn("Interrupted during shutdown - will do fast shutdown: "+ex, ex);
                
                shutdownNow();

                return;

            }

            super.shutdown();

            // Service is halted.
            setRunState(TxServiceRunState.Halted);

        } finally {

            lock.unlock();
            
        }

    }

    /**
     * Wait until active transactions complete.
     * 
     * @param logTimeout
     *            The timeout between {@link #logTimeout(long, TimeUnit)}
     *            messages.
     * @param unit
     *            The unit for that timeout.
     * 
     * @throws InterruptedException
     *             if this method is interrupted.
     */
    private void awaitRunningTx(long logTimeout, final TimeUnit unit)
            throws InterruptedException {

        final long begin = System.nanoTime();

        long lastLogTime = begin;

        // convert to nanoseconds.
        logTimeout = unit.toNanos(logTimeout);

        long elapsed = 0L;

        if(log.isInfoEnabled())
            log.info("activeCount="+getActiveCount());

        while (getActiveCount() > 0) {

            // wait for a transaction to complete.
            if (txDeactivate.await(logTimeout, TimeUnit.NANOSECONDS)
                    && getActiveCount() == 0) {

                // no more tx are active.

                // update the elapsed time.
                elapsed = System.nanoTime() - begin;

                if(log.isInfoEnabled())
                    log.info("No transactions remaining: elapsed="+elapsed);
                
                return;
                
            }
            
            // update the elapsed time.
            elapsed = System.nanoTime() - begin;

            {

                final long now = System.nanoTime();

                final long elapsedLogTime = now - lastLogTime;

                if (elapsedLogTime >= logTimeout) {

                    try {

                        logTimeout(elapsed, TimeUnit.NANOSECONDS);
                        
                    } catch (Throwable t) {
                        
                        log.error("Ignored", t);
                        
                    }

                }

                lastLogTime = now;

            }

        } // while(true)

    }

    /**
     * Logs periodic messages during shutdown.
     * 
     * @param elapsed
     *            The elapsed time since shutdown was requested.
     * @param unit
     *            The unit in which that time is measured.
     */
    private void logTimeout(final long elapsed, final TimeUnit unit) {
        
        log.warn("Waiting on task(s)" + ": elapsed="
                + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms, #active="
                + getActiveCount() + ", #readWrite="
                + getReadWriteActiveCount() + ", #readOnly="
                + getReadOnlyActiveCount());

    }
    
    /**
     * Fast shutdown (not immediate since it must abort active transactions).
     * <p>
     * New transactions will not start and active transactions will be aborted.
     * Transactions which are concurrently committing MAY fail (throwing
     * exceptions from various methods, including {@link #nextTimestamp()})
     * when the service halts.
     */
    @Override
    public void shutdownNow() {

        if(log.isInfoEnabled()) 
            log.info("");

        lock.lock();
        try {

            switch (getRunState()) {
            case ShutdownNow:
            case Halted:
                return;
            }

            setRunState(TxServiceRunState.ShutdownNow);

            // Abort all active transactions.
            abortAllTx();

            super.shutdownNow();

            setRunState(TxServiceRunState.Halted);

        } finally {

            lock.unlock();

        }

    }

    /**
     * Abort all active transactions.
     */
    public void abortAllTx() {

        lock.lock();

        try {

            for (long tx : activeTx.keySet()) {

                final TxState state = activeTx.get(tx);

                if (state == null) {

                    /*
                     * Note: concurrent removal or clearing of the weak
                     * reference is possible.
                     */

                    continue;

                }

                state.lock.lock();

                try {

                    if (state.isActive()) {

                        // if (!state.isReadOnly()) {

                        try {

                            abortImpl(state);

                            assert state.isAborted() : state.toString();

                        } catch (Throwable t) {

                            log.error(state.toString(), t);

                        } finally {

                            deactivateTx(state);

                        }

                    }

                } finally {

                    state.lock.unlock();

                    /*
                     * Note: We are already holding the outer lock so we do not
                     * need to acquire it here.
                     */
                    updateReleaseTime(Math.abs(state.tx), null/* deactivatedTx */);

                }

            } // foreach tx in activeTx

            // signal once now that we are run.
            txDeactivate.signalAll();

            final int activeCount = getActiveCount();

            if (activeCount != 0) {

                log.warn("Service shutdown with active transactions: #nactive="
                        + activeTx.size());

            }
        } finally {

            lock.unlock();

        }

    }
    
    /**
     * Immediate/fast shutdown of the service and then destroys any persistent
     * state associated with the service.
     */
    @Override
    synchronized public void destroy() {

        log.warn("");
        
        lock.lock();

        try {

            shutdownNow();

            // Note: no persistent state in this abstract impl.
            
        } finally {

            lock.unlock();
            
        }

    }

    @Override
    public long nextTimestamp() {

//        setupLoggingContext();
//
//
//        try {

            /*
             * Note: This method is allowed in all run states (after startup)
             * since so much depends on the ability to obtain timestamps,
             * including the unisolated operations on individual journals or
             * data services.
             */
            switch (runState) {
            case Starting:
//            case Halted:
                throw new IllegalStateException(ERR_SERVICE_NOT_AVAIL);
            default:
                break;
            }

            return _nextTimestamp();

//        } finally {
//
//            clearLoggingContext();
//
//        }
        
    }
    
    /**
     * Private version is also used by {@link #start()}.
     * 
     * TODO Why is this synchronized(this)? The timestamp factory is
     * synchronized internally and {@link #lastTimestamp} is volatile.
     */ 
    synchronized private final long _nextTimestamp() {

        return lastTimestamp = MillisecondTimestampFactory.nextMillis();

    }
    /** The last timestamp issued. */
    private volatile long lastTimestamp;
    
    /**
     * {@inheritDoc}
     * <p>
     * Note: There is an upper bound of one read-write transaction that may be
     * created per millisecond (the resolution of {@link #nextTimestamp()}) and
     * requests for new read-write transactions contend with other requests for
     * {@link #nextTimestamp()}.
     * <p>
     * Note: The transaction service will refuse to start new transactions whose
     * timestamps are LTE to {@link #getReleaseTime()}.
     * 
     * @throws RuntimeException
     *             Wrapping {@link TimeoutException} if a timeout occurs
     *             awaiting a start time which would satisfy the request for a
     *             read-only transaction (this can occur only for read-only
     *             transactions which must contend for start times which will
     *             read from the appropriate historical commit point).
     */
    @Override
    public long newTx(final long timestamp) {

        setupLoggingContext();

        try {

            /*
             * Note: It may be possible to increase the concurrency of this
             * operation. Many cases do not allow contention since they will
             * just use the value returned by nextTimestamp(), which is always
             * distinct. Those cases which do allow contention involve search
             * for a start time that can read from a specific commit point. Even
             * then we may be able to reduce contention using atomic operations
             * on [activeTx], e.g., putIfAbsent().
             * 
             * However, pay attention to [lock]. Certainly it is serializing
             * newTx() at this point as well several other methods on this API.
             * Higher concurrency will require relaxing constraints on atomic
             * state transitions governed by [lock]. Perhaps by introducing
             * additional locks that are more specific. I don't want to relax
             * those constraints until I have a better sense of what must be
             * exclusive operations.
             */

            lock.lock();           
            try {
            	
                switch (getRunState()) {
                case Running:
                    break;
                default:
                    throw new IllegalStateException(ERR_SERVICE_NOT_AVAIL);
                }

               final TxState txState = assignTransactionIdentifier(timestamp);

                activateTx(txState);

                return txState.tx;

            } catch(TimeoutException ex) {
                
                throw new RuntimeException(ex);
                
            } catch(InterruptedException ex) {
                
                throw new RuntimeException(ex);
                
            } finally {

                lock.unlock();

            }

        } finally {

            clearLoggingContext();

        }

    }

    /**
     * A lock used to serialize certain operations that must be atomic with
     * respect to the state of the transaction service. Mostly this is used to
     * serialize the assignment of transaction identifiers and the update of the
     * release time as transactions complete.
     * <p>
     * Note: To avoid lock ordering problems DO NOT acquire this {@link #lock}
     * if you are already holding a {@link TxState#lock}. This causes a lock
     * ordering problem and can result in deadlock.
     */
    protected final ReentrantLock lock = new ReentrantLock();
    
    /**
     * Signaled by {@link #deactivateTx(TxState)} and based on {@link #lock}.
     */
    protected final Condition txDeactivate = lock.newCondition();

    /** #of transactions started. */
    private long startCount = 0L;

    /** #of transactions aborted. */
    private long abortCount = 0L;

    /** #of transactions committed (does not count bare commits). */
    private long commitCount = 0L;

    /** #of active read-write transactions. */
    private final AtomicLong readWriteActiveCount = new AtomicLong(0L);

    /** #of active read-only transactions. */
    private final AtomicLong readOnlyActiveCount = new AtomicLong(0L);

    /** #of transaction started. */
    public long getStartCount() {
        
        return startCount;
        
    }
    
    /** #of transaction aborted. */
    public long getAbortCount() {
        
        return abortCount;
        
    }
    
    /** #of transaction committed. */
    public long getCommitCount() {
        
        return commitCount;
        
    }
    
    public long getReadOnlyActiveCount() {
        
        return readOnlyActiveCount.get();
        
    }
    
    public long getReadWriteActiveCount() {
        
        return readWriteActiveCount.get();
        
    }
    
//    /**
//     * The minimum over the absolute values of the active transactions.
//     * <p>
//     * Note: This is a transaction identifier. It is NOT the commitTime on which
//     * that transaction is reading.
//     * 
//     * @see https://sourceforge.net/apps/trac/bigdata/ticket/467
//     */
//    public long getEarliestTxStartTime() {
//      
//      return earliestTxStartTime;
//      
//    }

    /**
     * Return the {@link TxState} for the earliest active Tx -or-
     * <code>null</code> if there is no active tx.
     * <p>
     * Note: The {@link #lock} is required in order to make atomic decisions
     * about the earliest active tx. Without the {@link #lock}, the tx could
     * stop or a new tx could start, thereby invalidating the "earliest active"
     * guarantee.
     * 
     * @throws IllegalMonitorStateException
     *             unless the {@link #lock} is held by the caller.
     */
    protected TxState getEarliestActiveTx() {
        
        if (!lock.isHeldByCurrentThread())
            throw new IllegalMonitorStateException();
        
//        final TxState state = getTxState(earliestOpenTxId);
//
//        return state;

        return earliestOpenTx;
        
    }
    
    /**
     * The earliest open transaction.
     * <p>
     * Note: This field is guarded by the {@link #lock}. However, it is declared
     * <code>volatile</code> to provide visibility to {@link #getCounters()}
     * without taking the lock.
     * 
     * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/467" >
     *      IllegalStateException trying to access lexicon index using RWStore
     *      with recycling </a>
     */
    private volatile TxState earliestOpenTx = null;
    
    /**
     * {@inheritDoc}
     * 
     * @see Options#MIN_RELEASE_AGE
     */
    @Override
    public long getReleaseTime() {

        if (log.isTraceEnabled())
            log.trace("releaseTime=" + releaseTime + ", lastKnownCommitTime="
                    + getLastCommitTime());
        
        return releaseTime;
        
    }
    private volatile long releaseTime = 0L;
    
//    /** Note: This code is incorrect.
//     * Provides correct value for RWStore deferred store releases to be
//     * recycled. The effective release time does not need a lock since we are
//     * called from within the AbstractJournal commit. The calculation can safely
//     * be based on the system time, the min release age and the earliest active
//     * transaction. The purpose is to permit the RWStore to recycle data based
//     * on the release time which will be in effect at the commit point.
//     * 
//     * @return earliest time that data can be released
//     */
//  public long getEarliestReleaseTime() {
//      final long immediate = System.currentTimeMillis() - minReleaseAge;
//      
//      return earliestTxStartTime == 0 || immediate < earliestTxStartTime 
//          ? immediate : earliestTxStartTime;
//  }
    
    /**
     * Sets the new release time.
     * <p>
     * Note: For a joined service in HA (the leader or a follower), the release
     * time is set by the consensus protocol. Otherwise it is automatically
     * maintained by {@link #updateReleaseTime(long, TxState)} and
     * {@link #updateReleaseTimeForBareCommit(long)}.
     * 
     * @param newValue
     *            The new value.
     *            
     * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/671">
     *      Query on follower fails during UPDATE on leader </a>
     */
    protected void setReleaseTime(final long newValue) {

        if (!lock.isHeldByCurrentThread())
            throw new IllegalMonitorStateException();

        final long oldValue = releaseTime;
        
        if (newValue < oldValue) {
            
//            throw new IllegalStateException("oldValue=" + oldValue
//                    + ", newValue=" + newValue);

            final String msg = "oldValue=" + oldValue + ", newValue="
                    + newValue;

            log.error(msg, new RuntimeException(msg));

            return;

        }

        if (log.isInfoEnabled())
            log.info("newValue=" + newValue);

        this.releaseTime = newValue;

    }
    
    /**
     * This method was introduced to compute the effective timestamp of the
     * pinned history in support of the HA TXS. It <strong>ignores</strong> the
     * <code>releaseTime</code> and reports the minimum of
     * <code>now - minReleaseAge</code> and the readsOnCommitTime of the
     * earliest active Tx. If the value would be negative, then ZERO (0L) is
     * reported instead.
     * <p>
     * Note: This duplicates logic in {@link #updateReleaseTime(long)}, but
     * handles the special case in HA where the releaseTime is not being updated
     * by {@link #updateReleaseTimeForBareCommit(long)}.
     * 
     * @return The effective release time.
     * 
     * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/623"> HA
     *      TXS </a>
     * 
     * @see #updateReleaseTime(long)
     */
    protected long getEffectiveReleaseTimeForHA() {

        if (minReleaseAge == Long.MAX_VALUE) {

            // All history is pinned.
            return 0L;

        }
        
        final long lastCommitTime = getLastCommitTime();
        
        lock.lock();

        try {

            final long now = _nextTimestamp();

            // Find the earliest commit time pinned by an active tx.
            final long earliestTxReadsOnCommitTime;

            final TxState txState = getEarliestActiveTx();
            
            if (txState == null) {
                
                // No active tx. Use now.
                earliestTxReadsOnCommitTime = now;
                
            } else {
                
                // Earliest active tx.
                earliestTxReadsOnCommitTime = txState.readsOnCommitTime;
                
            }

            /*
             * The release time will be the minimum of:
             * 
             * a) The timestamp BEFORE the lastCommitTime.
             * 
             * b) The timestamp BEFORE the earliestTxStartTime.
             * 
             * c) minReleaseAge milliseconds in the past.
             * 
             * Note: NEVER let go of the last commit time!
             * 
             * @todo there is a fence post here for [now-minReleaseAge] when
             * minReleaseAge is very large, e.g., Long#MAX_VALUE. This is caught
             * above for that specific value, but other very large values could
             * also cause problems.
             * 
             * @see https://sourceforge.net/apps/trac/bigdata/ticket/467
             */
            final long effectiveReleaseTimeForHA = Math.min(
                    lastCommitTime - 1,
                    Math.min(earliestTxReadsOnCommitTime - 1, now
                            - minReleaseAge));

            if (log.isDebugEnabled())
                log.debug("releaseTime=" + releaseTime //
                        + ", lastCommitTime=" + lastCommitTime
                        + ", earliestActiveTx=" + txState//
                        + ", readsOnCommitTime=" + earliestTxReadsOnCommitTime//
                        + ", (now-minReleaseAge)=" + (now - minReleaseAge)//
                        + ": effectiveReleaseTimeForHA=" + effectiveReleaseTimeForHA//
                        );

            return effectiveReleaseTimeForHA;

        } finally {
        
            lock.unlock();
            
        }
        
    }
    
    /**
     * Adds the transaction from to the local tables.
     * 
     * @param state
     *            The transaction.
     */
    protected void activateTx(final TxState state) {

        if (state == null)
            throw new IllegalArgumentException();

        state.lock.lock();

        try {
        
            if (!state.isActive())
                throw new IllegalArgumentException();

            if (this.earliestOpenTx == null
                    || Math.abs(state.tx) < Math.abs(this.earliestOpenTx.tx)) {

                /*
                 * This is the earliest open transaction. This is defined as the
                 * transaction whose readsOnCommitTime is LTE all other
                 * transactions and whose absolute txId value is LT all other
                 * transactions. Since we assign the txIds in intervals GTE the
                 * readsOnCommitTime and LT the next possible commit point, we
                 * can maintain this invariant by only comparing abs(txId).
                 */

                this.earliestOpenTx = state;

            }
            
            activeTx.put(state.tx, state);
            
            synchronized(startTimeIndex) {
                
                /*
                 * Note: Using the absolute value of the assigned timestamp so
                 * that the index is ordered earliest to most recent. This means
                 * that the absolute value of the timestamps must be unique,
                 * otherwise this will throw out an exception.
                 */

//                startTimeIndex.add(Math.abs(state.tx), state.readsOnCommitTime);
                startTimeIndex.add(state);
                
            }
            
            startCount++;

            if(state.isReadOnly()) {
                
                readOnlyActiveCount.incrementAndGet();
                
            } else {
                
                readWriteActiveCount.incrementAndGet();
                
            }
            
            if (log.isInfoEnabled())
                log.info(state.toString() + ", releaseTime="+releaseTime+", earliestActiveTx="+earliestOpenTx+", startCount=" + startCount
                        + ", abortCount=" + abortCount + ", commitCount="
                        + commitCount + ", readOnlyActiveCount="
                        + readOnlyActiveCount + ", readWriteActiveCount="
                        + readWriteActiveCount);

        } finally {
            
            state.lock.unlock();
            
        }
        
    }

    /**
     * Return the commit time on which the transaction is reading.
     * <p>
     * Note: This method is exposed primarily for the unit tests.
     * 
     * @param txId
     *            The transaction identifier.
     * @return The commit time on which that transaction is reading.
     * @throws IllegalArgumentException
     *             if there is no such transaction.
     */
    protected long getReadsOnTime(final long txId) {
        
        final TxState state = activeTx.get(txId);
        
        if(state == null)
            throw new IllegalArgumentException();

        return state.readsOnCommitTime;
    
    }
    
    /**
     * Removes the transaction from the local tables.
     * <p>
     * Note: The caller MUST own {@link TxState#lock} across this method and
     * MUST then do
     * 
     * <pre>
     * updateReleaseTime(long)
     * deactivateTx.signallAll()
     * </pre>
     * 
     * while holding the outer {@link #lock}.
     * <p>
     * Note: Normally this method is invoked without the outer {@link #lock}
     * which necessitates lifting those method calls out of this method and into
     * the caller.
     * 
     * @param state
     *            The transaction.
     * 
     * @throws IllegalMonitorStateException
     *             unless the caller is holding the {@link TxState#lock}.
     */
    protected void deactivateTx(final TxState state) {

        if (state == null)
            throw new IllegalArgumentException();

        if (!state.lock.isHeldByCurrentThread())
            throw new IllegalMonitorStateException();
        
//        try {

            if (!state.isComplete())
                throw new IllegalArgumentException();

            if (state.isAborted()) {

                abortCount++;

            } else {

                commitCount++;

            }

            if (state.isReadOnly()) {

                readOnlyActiveCount.decrementAndGet();

            } else {

                readWriteActiveCount.decrementAndGet();

            }

            if (activeTx.remove(state.tx) == null) {

                log.warn("Transaction not in table: " + state);

            }

        if (log.isInfoEnabled())
            log.info(state.toString() + ", startCount=" + startCount
                    + ", abortCount=" + abortCount + ", commitCount="
                    + commitCount + ", readOnlyActiveCount="
                    + readOnlyActiveCount + ", readWriteActiveCount="
                    + readWriteActiveCount);

//        } finally {
//
//            state.lock.unlock();
//
//        }

    }
    
    /**
     * Return <code>true</code> iff the release time consensus protocol is being
     * used to update the releaseTime (HA and this service is either a leader or
     * a follower). Return <code>false</code> iff the service should locally
     * manage its own release time (non-HA and HA when the service is
     * {@link HAStatusEnum#NotReady}).
     * <p>
     * Note: When we are using a 2-phase commit, the leader can not update the
     * release time from commit() using this methods. It must rely on the
     * consensus protocol to update the release time instead.
     * 
     * @see <a href=
     *      "https://sourceforge.net/apps/trac/bigdata/ticket/530#comment:116">
     *      Journal HA </a>
     */
    protected boolean isReleaseTimeConsensusProtocol() {

        return false;
        
    }
   
    /**
     * This method MUST be invoked each time a transaction completes with the
     * absolute value of the transaction identifier that has just been
     * deactivated. The method will remove the transaction entry in the ordered
     * set of running transactions ({@link #startTimeIndex}).
     * <p>
     * If the specified timestamp corresponds to the earliest running
     * transaction, then the <code>releaseTime</code> will be updated and the
     * new releaseTime will be set using {@link #setReleaseTime(long)}. For HA,
     * the releaseTime is updated by a consensus protocol and the individual
     * services MUST NOT advance their releaseTime as transactions complete.
     * <p>
     * Note: When we are using a 2-phase commit, the leader can not update the
     * release time from commit() using this methods. It must rely on the
     * consensus protocol to update the release time instead.
     * 
     * @param timestamp
     *            The absolute value of a transaction identifier that has just
     *            been deactivated.
     * @param deactivatedTx
     *            The transaction object that has been deactivated -or-
     *            <code>null</code> if there are known to be no active
     *            transactions remaining (e.g., startup and abortAll()).
     * 
     * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/467" >
     *      IllegalStateException trying to access lexicon index using RWStore
     *      with recycling </a>
     * 
     * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/671">
     *      Query on follower fails during UPDATE on leader </a>
     * 
     * @todo the {@link #startTimeIndex} could be used by
     *       {@link #findUnusedTimestamp(long, long)} so that it could further
     *       constrain its search within the half-open interval.
     */
    final protected void updateReleaseTime(final long timestamp,
            final TxState deactivatedTx) {

        if (timestamp <= 0)
            throw new IllegalArgumentException();

        /*
         * Note: The calculation of the new release time needs to be atomic.
         * 
         * Note: This uses the same lock that we use to create new transactions
         * in order to prevent a new transaction from starting while we are
         * updating the release time.
         * 
         * @todo Should this also be used to serialize both handing out commit
         * times (for 2-phase commits) and acknowledging commit times (for
         * single phase commits)?
         */
        if (!lock.isHeldByCurrentThread())
            throw new IllegalMonitorStateException();

        // current timestamp.
        final long now = _nextTimestamp();

        // current value for the releaseTime.
        final long oldReleaseTime = this.releaseTime;
        
        /*
         * true iff the tx specified by the caller was the earliest running
         * transaction.
         */
        final boolean isEarliestTx;

//        /*
//         * The earliest tx remaining now that the caller's tx is complete and
//         * [now] if there are no more running transactions.
//         */
//        final long earliestTxStartTimeX;
//        /*
//         * The commit time on which the earliest remaining tx is reading and
//         * [now] if there are no more running transactions.
//         */
//        final long earliestTxReadsOnCommitTimeX;

        TxState earliestActiveTx = null;
        
        synchronized (startTimeIndex) {

            // Note: ZERO (0) is the first tuple in the B+Tree.
            // Note: MINUS ONE (-1) means that the B+Tree is empty.
            final long indexOf = startTimeIndex.findIndexOf(timestamp);
            
            isEarliestTx = indexOf == 0;

            // remove start time from the index.
            if (indexOf != -1)
                startTimeIndex.remove(timestamp);

//            if (!isEarliestTx) {
//
//                // No change unless earliest tx terminates.
//                return;
//
//            }

            if (startTimeIndex.getEntryCount() > 0) {
                /* There are remaining entries in the [startTimeIndex]. Scan it for the earliestActiveTx remaining. 
                 * 
                 * Note: We need to handle a data race where the earliest active
                 * tx in the [startTimeIndex] has been concurrently deactivated
                 * (and removed from the [activeTx] map). This is done by
                 * scanning until we find the first active tx in the
                 * [startTimeIndex]. It will typically be the first entry.
                 * 
                 * Note: transactions can not start or end while we are
                 * synchronized the [startTimeIndex].
                 */

                @SuppressWarnings("rawtypes")
                final ITupleIterator titr = startTimeIndex.rangeIterator();

                while (titr.hasNext()) {

                    @SuppressWarnings("rawtypes")
                    final ITuple t = titr.next();

                    final ITxState0 x = (ITxState0) t.getObject();

                    // Lookup the [activeTx] map.
                    final TxState tmp = getTxState(x.getStartTimestamp());

                    if (tmp == null) {
                        
                        /*
                         * Transaction is no longer active (and no longer in the
                         * activeTx map).
                         */

                        continue;
                        
                    }

                    if (!tmp.isActive()) {

                        // Transaction is no longer active.
                        continue;
                        
                    }

                    // Must not be the tx that we just deactivated.
                    assert tmp != deactivatedTx;
                    
                    earliestActiveTx = tmp;
                 
                    break;
                    
                }

//                /*
//                 * The start time associated with the earliest remaining tx.
//                 */
//                final byte[] key = startTimeIndex.keyAt(0L);
//                
//                earliestTxStartTime = startTimeIndex.decodeKey(key);
//
//                /*
//                 * The commit point on which that tx is reading.
//                 * 
//                 * @see https://sourceforge.net/apps/trac/bigdata/ticket/467
//                 */
//
//                final byte[] val = startTimeIndex.valueAt(0L);
//                
//                earliestTxReadsOnCommitTime = startTimeIndex.decodeVal(val);
//                
//                // The earliest open transaction identifier.
//                this.earliestOpenTxId = earliestTxStartTime;
//
//                if (log.isTraceEnabled())
//                    log.trace("earliestOpenTxId=" + earliestTxStartTime);                   
                
            } else {

                /*
                 * There are no commit points and there are no active
                 * transactions.
                 */

//                earliestTxStartTime = earliestTxReadsOnCommitTime = now;

                // There are no open transactions.
                earliestActiveTx = null;

//                if (log.isTraceEnabled())
//                    log.trace("earliestOpenTxId=[noActiveTx]");

            }

            // Update the field [volatile write].
            this.earliestOpenTx = earliestActiveTx;
            
            if (log.isTraceEnabled())
                log.trace("earliestActiveTx=" + earliestActiveTx);

        } // synchronized(startTimeIndex)

        if (minReleaseAge == Long.MAX_VALUE) {

            return;
            
        }

        if (isEarliestTx && !isReleaseTimeConsensusProtocol()) {

            /*
             * The transaction that just finished was the earliest activeTx.
             */
            
            final long earliestTxStartTime = earliestActiveTx == null ? now
                    : earliestActiveTx.tx;
            
            final long earliestTxReadsOnCommitTime = earliestActiveTx == null ? now
                    : earliestActiveTx.readsOnCommitTime;

            // last commit time on the database.
            final long lastCommitTime = getLastCommitTime();

            // minimum milliseconds to retain history.
            final long minReleaseAge = getMinReleaseAge();

            /*
             * The release time will be the minimum of:
             * 
             * a) The timestamp BEFORE the lastCommitTime.
             * 
             * b) The timestamp BEFORE the earliestTxStartTime.
             * 
             * c) minReleaseAge milliseconds in the past.
             * 
             * Note: NEVER let go of the last commit time!
             * 
             * @todo there is a fence post here for [now-minReleaseAge] when
             * minReleaseAge is very large, e.g., Long#MAX_VALUE. This is caught
             * above for that specific value, but other very large values could
             * also cause problems.
             * 
             * @see https://sourceforge.net/apps/trac/bigdata/ticket/467
             */
            final long releaseTime = Math.min(
                    lastCommitTime - 1,
                    Math.min(earliestTxReadsOnCommitTime - 1, now
                            - minReleaseAge));
//            earliestTxStartTime - 1, now - minReleaseAge));

            /*
             * We only update the release time if the computed time would
             * advance the releaseTime.
             * 
             * Note: The releaseTime MUST NOT go backwards since the database
             * may have already released history for any commit point whose
             * commitTime is LTE to the existing releaseTime.
             */
            if (this.releaseTime < releaseTime) {

                if (log.isInfoEnabled())
                    log.info("lastCommitTime=" + lastCommitTime
                            + ", earliestTxStartTime=" + earliestTxStartTime
                            + ", minReleaseAge=" + minReleaseAge + ", now="
                            + now + ", releaseTime(" + oldReleaseTime + "->"
                            + releaseTime + ")");

                // update.
                setReleaseTime(releaseTime);

            }

        }

    }

    /**
     * The basic implementation advances the release time periodically as
     * commits occur even when there are no transactions in use.  
     * <p>
     * Note: This needs to be a fairly low-latency operation since this method
     * is invoked for all commits on all data services and will otherwise be a
     * global hotspot.
     */
    @Override
    public void notifyCommit(final long commitTime) {

        lock.lock();

        try {

            updateReleaseTimeForBareCommit(commitTime);
            
        } finally {

            lock.unlock();

        }

    }

    /**
     * If there are NO active transactions and the current releaseTime is LT
     * (commitTime-1) then compute and set the new releaseTime.
     * <p>
     * Note: This method was historically part of {@link #notifyCommit(long)}.
     * It was moved into its own method so it can be overridden for some unit
     * tests.
     * <p>
     * Note: When we are using a 2-phase commit, the leader can not update the
     * release time from commit() using this methods. It must rely on the
     * consensus protocol to update the release time instead.
     */
    protected void updateReleaseTimeForBareCommit(final long commitTime) {
        
//      if(!lock.isHeldByCurrentThread())
//          throw new IllegalMonitorStateException();

        lock.lock();
        try {

            synchronized (startTimeIndex) {

                if (!isReleaseTimeConsensusProtocol()
                        && this.releaseTime < (commitTime - 1)
                        && startTimeIndex.getEntryCount() == 0) {

                    final long lastCommitTime = commitTime;

                    final long now = _nextTimestamp();

                    final long releaseTime = Math.min(lastCommitTime - 1, now
                            - minReleaseAge);

                    if (this.releaseTime < releaseTime) {

                        if (log.isInfoEnabled())
                            log.info("Advancing releaseTime (no active tx)"
                                    + ": lastCommitTime=" + lastCommitTime
                                    + ", minReleaseAge=" + minReleaseAge
                                    + ", now=" + now + ", releaseTime("
                                    + this.releaseTime + "->" + releaseTime
                                    + ")");

                        setReleaseTime(releaseTime);

                    }

                }

            }

        } finally {
        
            lock.unlock();
            
        }

    }

    /**
     * Return the minimum #of milliseconds of history that must be preserved.
     * 
     * @todo This centralizes the value for the minimum amount of history that
     *       will be preserved across the federation.
     *       <p>
     *       If minReleaseTime is increased, then the release time can be
     *       changed to match, but only by NOT advancing it until we are
     *       retaining enough history.
     *       <p>
     *       If minReleaseTime is decreased, then we can immediately release
     *       more history (or at least as soon as the task runs to notify the
     *       discovered data services of the new release time).
     */
    final public long getMinReleaseAge() {

        return minReleaseAge;

    }

    /**
     * A transient index whose keys are the <strong>absolute value</strong> of
     * the start times of all active transactions. The values are the commit
     * times on which the corresponding transaction is reading.
     * <p>
     * Note: The absolute value constraint is imposed so that we can directly
     * identify the earliest active transaction in the index by its position (it
     * will be at position zero). This would not work if we let in negative
     * start times.
     * <p>
     * Note: In order to support this, {@link #findUnusedTimestamp(long, long)}
     * will not return a timestamp whose absolute value corresponds to an active
     * transaction.
     */
    private final TxId2CommitTimeIndex startTimeIndex = TxId2CommitTimeIndex
            .createTransient();

    /**
     * Assign a transaction identifier for a new transaction.
     * 
     * @param timestamp
     *            The timestamp.
     *            
     * @return The new transaction object.
     * 
     * @throws InterruptedException
     *             if interrupted while awaiting a start time which would
     *             satisfy the request.
     * @throws InterruptedException
     *             if a timeout occurs while awaiting a start time which would
     *             satisfy the request.
     */
    final protected TxState assignTransactionIdentifier(final long timestamp)
            throws InterruptedException, TimeoutException {
        
        final long lastCommitTime = getLastCommitTime();

        if (timestamp == ITx.UNISOLATED) {

            /*
             * When timestamp is ZERO (0L), this simply returns the next
             * distinct timestamp (with its sign bit flipped).
             * 
             * Note: This is guaranteed to be a valid start time since it is LT
             * the next possible commit point for the database.
             * 
             * Note: When we validate, we will read from [-startTime] and the
             * journal will identify the 1st commit point LTE [-startTime],
             * which will be the most recent commit point on the database as of
             * the moment when we assigned this transaction identifier.
             */

            // The transaction will read from the most recent commit point.
            return new TxState(-nextTimestamp(), lastCommitTime);

        }

//      if (timestamp > lastTimestamp) {
//
//            /*
//             * You can't request a historical read for a timestamp which has not
//             * yet been issued by this service!
//             */
//            
//          throw new IllegalStateException(
//                  "Timestamp is in the future: timestamp=" + timestamp
//                          + ", lastCommitTime=" + lastCommitTime
//                          + ", lastTimestamp=" + lastTimestamp);
//
//        } else 
            if (timestamp == lastCommitTime) {
            
            /*
             * Special case. We just return the next timestamp.
             * 
             * Note: This is equivalent to a request using the symbolic constant
             * READ_COMMITTED.
             */
            
            // The transaction will read from the most recent commit point.
            return new TxState(nextTimestamp(), lastCommitTime);
            
        }
        
        if (timestamp == ITx.READ_COMMITTED) {

            /*
             * This is a symbolic shorthand for a read-only transaction that
             * will read from the most recent commit point on the database.
             * 
             * Note: Once again we can just issue a timestamp since it will be
             * GT lastCommitTime.
             * 
             * Note: If [lastCommitTime == 0], we will still issue the next
             * timestamp.
             */

            // The transaction will read from the most recent commit point.
            return new TxState(nextTimestamp(), lastCommitTime);
            
        }
        
        final long releaseTime = getReleaseTime();
        
        if (timestamp <= releaseTime) {

            /*
             * This exception is thrown if there is an attempt to start a new
             * transaction that would read from historical data which has been
             * released. While the data MIGHT still be around, there is no way
             * to assert a read lock for that data since the releaseTime is
             * already in the future.
             */
            
            throw new IllegalStateException(
                    "Timestamp is less than or equal to the release time: timestamp="
                            + timestamp + ", releaseTime=" + releaseTime);
            
        }
        
        return getStartTime(timestamp);

    }

    /**
     * Assign a distinct timestamp to a historical read that will read from the
     * commit point identified by the specified timestamp.
     * <p>
     * Note: Under some circumstances the assignment of a read-only transaction
     * identifier must be delayed until a distinct timestamp becomes available
     * between the designed start time and the next commit point.
     * 
     * @param timestamp
     *            The timestamp (identifies the desired commit point).
     * 
     * @return A new transaction object using a distinct timestamp not in use by
     *         any transaction that will read from the same commit point.
     */
    final private TxState getStartTime(final long timestamp)
            throws InterruptedException, TimeoutException {

        /*
         * Find the commit time from which the tx will read (largest commitTime
         * LTE timestamp).
         */
        final long commitTime = findCommitTime(timestamp);

        // The transaction will read from this commit point.
        final long readsOnCommitTime = commitTime == -1 ? 0 : commitTime;
        
        if (commitTime == -1L) {

            /*
             * There are no commit points in the log.
             * 
             * Note: Just return the next timestamp. It is guaranteed to be GT
             * the desired commit time (which does not exist) and LT the next
             * commit point.
             */

            return new TxState(nextTimestamp(),readsOnCommitTime);

//            /*
//             * Note: I believe that this can only arise when there are no commit
//             * points in the log. The thrown exception is per the top-level api
//             * for ITransactionService#newTx(long).
//             */
//            throw new IllegalStateException(
//                    "No data for that commit time: timestamp=" + timestamp);

        }

        /*
         * The commit time for the successor of that commit point (GT).
         */
        final long nextCommitTime = findNextCommitTime(commitTime);

        if (nextCommitTime == -1L) {

            /*
             * Note: If there is no successor of the desired commit point then
             * we can just return the next timestamp. It is guaranteed to be GT
             * the desired commit time and LT the next commit point. [Note: this
             * case is in fact handled above so you should not get here.]
             */

            return new TxState(nextTimestamp(), readsOnCommitTime);

        }

        // Find a valid, unused timestamp.
        final long txId = findUnusedTimestamp(commitTime, nextCommitTime,
                1000/* timeout */, TimeUnit.MILLISECONDS);
        
        return new TxState(txId, readsOnCommitTime);

    }

    /**
     * Find the commit time from which the tx will read (largest commitTime LTE
     * timestamp).
     * 
     * @param timestamp
     *            The timestamp.
     * 
     * @return The commit time and -1L if there is no such commit time.
     */
    protected abstract long findCommitTime(long timestamp);

    /**
     * Return the commit time for the successor of that commit point have the
     * specified timestamp (a commit time strictly GT the given value).
     * 
     * @param commitTime
     *            The probe.
     * @return The successor or -1L iff the is no successor for that commit
     *         time.
     */
    protected abstract long findNextCommitTime(long commitTime);
    
    /**
     * Find a valid, unused timestamp.
     * <p>
     * Note: Any timestamp in the half-open range [commitTime:nextCommitTime)
     * MAY be assigned as all such timestamps will read from the commit point
     * associated with [commitTime].
     * 
     * @param commitTime
     *            The commit time for the commit point on which the tx will read
     *            (this must be the exact timestamp associated with the desired
     *            commit point).
     * @param nextCommitTime
     *            The commit time for the successor of that commit point.
     * @param timeout
     *            The maximum length of time to await an available timestamp.
     * @param unit
     *            The unit in which <i>timeout</i> is expressed.
     */
    protected long findUnusedTimestamp(final long commitTime,
            final long nextCommitTime, final long timeout, final TimeUnit unit)
            throws InterruptedException, TimeoutException {

        final long begin = System.nanoTime();
        final long nanos = unit.toNanos(timeout);
        long remaining = nanos;
        
        while (remaining >= 0) {

            for (long t = commitTime; t < nextCommitTime; t++) {

                if (activeTx.containsKey(t) || activeTx.containsKey(-t)) {

                    /*
                     * Note: We do not accept an active read-only startTime.
                     * 
                     * Note: We do not accept a start time that corresponds to
                     * the absolute value of an active read-write transaction
                     * either. This latter constraint is imposed so that the
                     * keys in the [startTimeIndex] can be the absolute value of
                     * the assigned timestamp and still be unique.
                     * 
                     * @todo We could grab the timestamp using an atomic
                     * putIfAbsent and a special value and the replace the value
                     * with the desired one (or just construct the TxState
                     * object each time and discard it if the map contains that
                     * key). This might let us increase concurrency for newTx().
                     */

                    continue;

                }

                return t;

            }

            /*
             * Wait for a tx to terminate. If it is in the desired half-open
             * range it will be detected by the loop above.
             * 
             * Note: This requires that we use signalAll() since we could be
             * waiting on more than one half-open range.
             * 
             * @todo if we used a Condition for the half-open range then we
             * could signal exactly that condition.
             * 
             * Note: throws InterruptedException
             */
            
            remaining = nanos - (System.nanoTime() - begin);

            if (!txDeactivate.await(remaining, TimeUnit.NANOSECONDS)) {

                throw new TimeoutException();
                
            }

            remaining = nanos - (System.nanoTime() - begin);

        }

        throw new TimeoutException();
        
    }
    
    /**
     * Note: Declared abstract so that we can hide the {@link IOException}.
     */
    @Override
    abstract public long getLastCommitTime();

    /**
     * Implementation must abort the tx on the journal (standalone) or on each
     * data service (federation) on which it has written.
     * <p>
     * Pre-conditions:
     * <ol>
     * <li>The transaction is {@link RunState#Active}; and</li>
     * <li>The caller holds the {@link TxState#lock}.</li>
     * </ol>
     * <p>
     * Post-conditions:
     * <ol>
     * <li>The transaction is {@link RunState#Aborted}; and</li>
     * <li>The transaction write set has been discarded by each {@link Journal}
     * or {@link IDataService} or which it has written (applicable for
     * read-write transactions only).</li>
     * </ol>
     * <p>
     * 
     * @param state
     *            The transaction state as maintained by the transaction server.
     */
    abstract protected void abortImpl(final TxState state) throws Exception;

    /**
     * Implementation must either single-phase commit (standalone journal or a
     * transaction that only writes on a single data service) or 2-/3-phase
     * commit (distributed transaction running on a federation).
     * <p>
     * Pre-conditions:
     * <ol>
     * <li>The transaction is {@link RunState#Active}; and</li>
     * <li>The caller holds the {@link TxState#lock}.</li>
     * </ol>
     * <p>
     * Post-conditions (success for read-only transaction or a read-write
     * transaction with an empty write set):
     * <ol>
     * <li>The transaction is {@link RunState#Committed}; and</li>
     * <li>The returned <i>commitTime</i> is ZERO (0L).</li>
     * </ol>
     * <p>
     * Post-conditions (success for read-write transaction with a non-empty
     * write set):
     * <ol>
     * <li>The transaction is {@link RunState#Committed};</li>
     * <li>The transaction write set has been made restart-safe by each
     * {@link Journal} or {@link IDataService} or which it has written
     * (applicable for read-write transactions only); and</li>
     * <li>The application can read exactly the data written by the transaction
     * from the commit point identified by the returned <i>commitTime</i>.</li>
     * </ol>
     * <p>
     * Post-conditions (failure):
     * <ol>
     * <li>The transaction is {@link RunState#Aborted}; and</li>
     * <li>The transaction write set has been discarded by each {@link Journal}
     * or {@link IDataService} or which it has written (applicable for
     * read-write transactions only).</li>
     * </ol>
     * 
     * @param tx
     *            The transaction identifier.
     * 
     * @return The commit time for the transaction -or- ZERO (0L) if the
     *         transaction was read-only or had an empty write set.
     * 
     * @throws Exception
     *             if something else goes wrong. This will be (or will wrap) a
     *             {@link ValidationError} if validation fails.
     */
    abstract protected long commitImpl(final TxState state) throws Exception;

    /**
     * Abort the transaction (asynchronous).
     */
    @Override
    public void abort(final long tx) {

        setupLoggingContext();
        
        try {

            switch (runState) {
            case Running:
            case Shutdown:
                break;
            default:
                throw new IllegalStateException(ERR_SERVICE_NOT_AVAIL);
            }
            
            final TxState state = activeTx.get(tx);

            if (state == null)
                throw new IllegalStateException(ERR_NO_SUCH);

            boolean wasActive = false;
            state.lock.lock();

            try {

                if (!state.isActive()) {

                    throw new IllegalStateException(ERR_NOT_ACTIVE);

                }
                wasActive = true;

                try {

                    abortImpl(state);
                    
                    assert state.isAborted() : state.toString();

                } catch (Throwable t) {

                    log.error(state.toString(),t);
                    
//                } finally {
//
//                    deactivateTx(state);
//                    
                }

            } finally {

//                state.lock.unlock();
                try {
                    if (wasActive) {
                        deactivateTx(state);
                    }
                } finally {
                    /*
                     * Note: This avoids a lock ordering problem by releasing
                     * the inner lock (state.lock) before acquiring the order
                     * lock.
                     */
                    state.lock.unlock();
                    if (wasActive) {
                        lock.lock();
                        try {
                            updateReleaseTime(Math.abs(state.tx), state/*deactivatedTx*/);
                            /*
                             * Note: signalAll() is required. See code that
                             * searches the half-open range for a
                             * read-historical timestamp. It waits on this
                             * signal, but there can be more than one request
                             * waiting an requests can be waiting on different
                             * half-open ranges.
                             */
                            txDeactivate.signalAll();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            }

        } finally {

            clearLoggingContext();

        }

    }

    @Override
    public long commit(final long tx) throws ValidationError {

        setupLoggingContext();

        try {

            switch (runState) {
            case Running:
            case Shutdown:
                break;
            default:
                throw new IllegalStateException(ERR_SERVICE_NOT_AVAIL);
            }

            final TxState state = activeTx.get(tx);

            if (state == null) {

                throw new IllegalStateException(ERR_NO_SUCH);

            }

            boolean wasActive = false;
            state.lock.lock();

            try {

                if (!state.isActive()) {

                    throw new IllegalStateException(ERR_NOT_ACTIVE);

                }
                wasActive = true;

                try {

                    final long commitTime = commitImpl(state);

                    assert state.isCommitted() : "tx=" + state;

                    return commitTime;

                } catch (Throwable t2) {

//                    log.error(t2.getMessage(), t2);

                    assert state.isAborted() : "ex=" + t2 + ", tx=" + state;

                    if (InnerCause.isInnerCause(t2, ValidationError.class)) {

                        throw new ValidationError();

                    }

                    log.error(t2.getMessage(), t2);

                    throw new RuntimeException(t2);

                }

            } finally {

                try {
                    if (wasActive) {
                        deactivateTx(state);
                    }
                } finally {
                    /*
                     * Note: This avoids a lock ordering problem by releasing
                     * the inner lock (state.lock) before acquiring the order
                     * lock.
                     */
                    state.lock.unlock();
                    if (wasActive) {
                        lock.lock();
                        try {
                            updateReleaseTime(Math.abs(state.tx),state/*deactivatedTx*/);
                            /*
                             * Note: signalAll() is required. See code that
                             * searches the half-open range for a
                             * read-historical timestamp. It waits on this
                             * signal, but there can be more than one request
                             * waiting and requests can be waiting on different
                             * half-open ranges.
                             */
                            txDeactivate.signalAll();
                        } finally {
                            lock.unlock();
                        }
                    }
                }

            }

        } finally {

            clearLoggingContext();

        }

    }

    /**
     * Transaction state as maintained by the {@link ITransactionService}.
     * <p>
     * Note: The commitTime and revisionTime are requested by the local
     * transaction manager for single phase commits, which means that this class
     * could only know their values for a distributed transaction commit. Hence
     * they are not represented here.
     */
    public class TxState implements ITxState {

        /**
         * The transaction identifier.
         */
        public final long tx;
        
        /**
         * The commit time associated with the commit point against which this
         * transaction will read. This will be <code>0</code> IFF there are no
         * commit points yet. Otherwise it is a real commit time associated with
         * some existing commit point.
         */
        private final long readsOnCommitTime;
        
        /**
         * <code>true</code> iff the transaction is read-only.
         */
        private final boolean readOnly;

        /**
         * The run state of the transaction
         * <p>
         * Note: This field is guarded by the {@link #lock}. It is [volatile] to
         * make the state visible using a volatile write for the methods on the
         * {@link ITxState} interface (isActive(), etc).
         */
        private volatile RunState runState = RunState.Active;
        
        /**
         * Change the {@link RunState}.
         * 
         * @param newval
         *            The new {@link RunState}.
         * 
         * @throws IllegalArgumentException
         *             if the argument is <code>null</code>.
         * @throws IllegalStateException
         *             if the state transition is not allowed.
         * 
         * @see RunState#isTransitionAllowed(RunState)
         */
        public void setRunState(final RunState newval) {

            if (!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();

            if (newval == null)
                throw new IllegalArgumentException();
            
            if (!runState.isTransitionAllowed(newval)) {

                throw new IllegalStateException("runState=" + runState
                        + ", newValue=" + newval);

            }

            this.runState = newval;
            
        }

        @Override
        final public long getStartTimestamp() {

            return tx;
            
        }

        @Override
        final public long getReadsOnCommitTime() {
            
            return readsOnCommitTime;
            
        }
        
        /**
         * The commit time assigned to a distributed read-write transaction
         * during the commit protocol and otherwise ZERO (0L).
         * <p>
         * Note: This field is guarded by the {@link #lock}.
         */
        private long commitTime = 0L;
        
        /**
         * The commit time assigned to a distributed read-write transaction
         * during the commit protocol.
         * 
         * @return The assigned commit time.
         * 
         * @throws IllegalStateException
         *             if the commit time has not been assigned.
         */
        public long getCommitTime() {
            
            if (!lock.isHeldByCurrentThread()) {

                throw new IllegalMonitorStateException();
                
            }
            
            if (commitTime == 0L) {

                throw new IllegalStateException();
                
            }
            
            return commitTime;
            
        }

        /**
         * Sets the assigned commit time.
         * 
         * @param commitTime
         *            The assigned commit time.
         */
        protected void setCommitTime(final long commitTime) {

            if (!lock.isHeldByCurrentThread()) {

                throw new IllegalMonitorStateException();
                
            }
            
            if (commitTime == 0L) {
                
                throw new IllegalArgumentException();
                
            }

            if (this.commitTime != 0L) {
                
                throw new IllegalStateException();
                
            }
            
            this.commitTime = commitTime;

        }
        
        /**
         * The set of {@link DataService}s on which a read-write transaction
         * has been started and <code>null</code> if this is not a read-write
         * transaction.
         * <p>
         * Note: We only track this information for a distributed database.
         */
        private final Set<UUID/* dataService */> dataServices;

        /**
         * The set of named resources that the transaction has declared across
         * all {@link IDataService}s on which it has written and
         * <code>null</code> if this is not a read-write transaction.
         * <p>
         * Note: We only track this information for a distributed database.
         */
        private final Set<String/* name */> resources;

        /**
         * Return the resources declared by the transaction.
         */
        public String[] getResources() {

            if (!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();

            if (resources == null)
                return EMPTY;

            return resources.toArray(new String[] {});
            
        }
        
        /**
         * Return <code>true</code> iff the dataService identified by the
         * {@link UUID} is one on which this transaction has been started.
         * 
         * @param dataServiceUUID
         *            The {@link UUID} identifying an {@link IDataService}.
         * 
         * @return <code>true</code> if this transaction has been started on
         *         that {@link IDataService}. <code>false</code> for
         *         read-only transactions.
         */
        public boolean isStartedOn(final UUID dataServiceUUID) {
            
            if(!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();

            if (dataServiceUUID == null)
                throw new IllegalArgumentException();
            
            if (dataServices == null)
                return false;

            return dataServices.contains(dataServiceUUID);
            
        }
        
        /**
         * The set of {@link DataService}s on which the transaction has
         * written.
         * 
         * @throws IllegalStateException
         *             if not a read-write transaction.
         */
        protected UUID[] getDataServiceUUIDs() {

            if(!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();
            
            if (dataServices == null)
                throw new IllegalStateException();
            
            return dataServices.toArray(new UUID[] {});
            
        }

        /**
         * A per-transaction lock used to serialize operations on a given
         * transaction. You need to hold this lock for most of the operations on
         * this class, including any access to the {@link RunState}.
         * <p>
         * Note: DO NOT attempt to acquire the outer
         * {@link AbstractTransactionService#lock} if you are already holding
         * this {@link #lock}. This is a lock ordering problem and can result in
         * a deadlock.
         */
        final protected ReentrantLock lock = new ReentrantLock();
        
        /**
         * 
         * @param tx
         *            The assigned transaction identifier.
         * @param readCommitTime
         *            The commit time associated with the commit point against
         *            which this transaction will read (may be ZERO if there are
         *            no commit points, must not be negative).
         */
        protected TxState(final long tx, final long readCommitTime) {
            
            if (tx == ITx.UNISOLATED)
                throw new IllegalArgumentException();

            if (tx == ITx.READ_COMMITTED)
                throw new IllegalArgumentException();

            if (readCommitTime < 0)
                throw new IllegalArgumentException();

            this.tx = tx;
            
            this.readsOnCommitTime = readCommitTime;
            
            this.readOnly = TimestampUtility.isReadOnly(tx);
                       
            // pre-compute the hash code for the transaction.
            this.hashCode = Long.valueOf(tx).hashCode();

            this.dataServices = readOnly ? null : new LinkedHashSet<UUID>();

            this.resources = readOnly ? null : new LinkedHashSet<String>();
            
        }

        /**
         * The hash code is based on the {@link #getStartTimestamp()}.
         */
        @Override
        final public int hashCode() {
            
            return hashCode;

        }

        private final int hashCode;

        /**
         * True iff they are the same object or have the same start timestamp.
         * 
         * @param o
         *            Another transaction object.
         */
        @Override
        final public boolean equals(final Object o) {

            if (this == o)
                return true;

            if (!(o instanceof ITx))
                return false;

            final ITx t = (ITx) o;

            return tx == t.getStartTimestamp();

        }

        /**
         * Declares resources on a data service instance on which the
         * transaction will write.
         * 
         * @param dataService
         *            The data service identifier.
         * @param resource
         *            An array of named resources on the data service on which
         *            the transaction will write (or at least for which it
         *            requires an exclusive write lock).
         * 
         * @throws IllegalStateException
         *             if the transaction is read-only.
         * @throws IllegalStateException
         *             if the transaction is not active.
         */
        final public void declareResources(final UUID dataService,
                final String[] resource) {

            if (dataService == null)
                throw new IllegalArgumentException();
            
            if (resource == null)
                throw new IllegalArgumentException();
            
            if (!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();

            if (readOnly)
                throw new IllegalStateException(ERR_READ_ONLY);

            if (!isActive())
                throw new IllegalStateException(ERR_NOT_ACTIVE);

            dataServices.add(dataService);
            
            // Note: sufficient to prevent deadlocks when there are shared indices.
            resources.addAll(Arrays.asList(resource));
            
            if (log.isInfoEnabled())
                log.info("dataService=" + dataService + ", resource="
                        + Arrays.toString(resource));

        }

//        /**
//         * Return <code>true</code> if the transaction is read-only or if a
//         * read-write transaction has not been started on any
//         * {@link IDataService}s.
//         * <p>
//         * <strong>WARNING: This method should only be used for distributed
//         * databases. It will always report [false] for a standalone database
//         * since
//         * {@link ITransactionService#declareResources(long, UUID, String[])} is
//         * not invoked for a standalone database!</strong>
//         */
//        final public boolean isEmptyWriteSet() {
//
//            if(!lock.isHeldByCurrentThread())
//                throw new IllegalMonitorStateException();
//
//            return readOnly || dataServices.isEmpty();
//
//        }

        /**
         * Return the #of {@link IDataService}s on which a read-write
         * transaction has executed an operation.
         * 
         * @return The #of {@link IDataService}.
         * 
         * @throws IllegalStateException
         *             if the transaction is read-only.
         * @throws IllegalMonitorStateException
         *             if the caller does not hold the lock.
         */
        final public int getDataServiceCount() {
            
            if(!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();

            if(readOnly)
                throw new IllegalStateException(ERR_READ_ONLY);
            
            return dataServices.size();

        }
        
        /**
         * Return <code>true</code> iff a read-write transaction has started on
         * more than one {@link IDataService}.
         */
        final boolean isDistributedTx() {

            if(!lock.isHeldByCurrentThread())
                throw new IllegalMonitorStateException();

            return !readOnly && dataServices.size() > 1;

        }

        /**
         * Returns a string representation of the transaction state.
         */
        @Override
        final public String toString() {

            /*
             * Note: info reported here MUST be safe and MUST NOT require a
             * lock!
             */
            
            return "GlobalTxState{tx=" + tx + ",readsOnCommitTime="
                    + readsOnCommitTime + ",readOnly=" + readOnly
                    + ",runState=" + runState + "}";

        }

        @Override
        final public boolean isReadOnly() {

            return readOnly;

        }

        @Override
        final public boolean isActive() {

//            if(!lock.isHeldByCurrentThread())
//                throw new IllegalMonitorStateException();

            // volatile read.
            return runState == RunState.Active;

        }

        @Override
        final public boolean isPrepared() {

//            if(!lock.isHeldByCurrentThread())
//                throw new IllegalMonitorStateException();

            // volatile read.
            return runState == RunState.Prepared;

        }

        @Override
        final public boolean isComplete() {

//            if(!lock.isHeldByCurrentThread())
//                throw new IllegalMonitorStateException();

            // volatile read.
            final RunState tmp = runState;

            return tmp == RunState.Committed || tmp == RunState.Aborted;

        }

        @Override
        final public boolean isCommitted() {

//            if(!lock.isHeldByCurrentThread())
//                throw new IllegalMonitorStateException();

            // volatile read.
            return runState == RunState.Committed;

        }

        @Override
        final public boolean isAborted() {

//            if(!lock.isHeldByCurrentThread())
//                throw new IllegalMonitorStateException();

            // volatile read.
            return runState == RunState.Aborted;

        }

    }

    /**
     * Verifies that {@link #nextTimestamp()} will not report a time before
     * {@link #getLastCommitTime()} and then changes the {@link TxServiceRunState}
     * to {@link TxServiceRunState#Running}.
     */
    @Override
    public AbstractTransactionService start() {

        if(log.isInfoEnabled()) 
            log.info("");

        lock.lock();

        try {

            switch (getRunState()) {
            case Starting:
                break;
            default:
                throw new IllegalStateException();
            }
            
            final long timestamp = _nextTimestamp();

            final long lastCommitTime = getLastCommitTime();

            if (timestamp < lastCommitTime) {

                throw new RuntimeException(
                        "Clock reporting timestamps before lastCommitTime: now="
                                + new Date(timestamp) + ", lastCommitTime="
                                + new Date(lastCommitTime));

            }

            /*
             * Note: This computes the releaseTime on startup.
             * 
             * Note: While nextTimestamp() is not really a transaction, it is LT
             * any possible transaction identifier (since there are no running
             * transactions).
             */
            updateReleaseTime(timestamp, null/* deactivatedTx */);

            setRunState(TxServiceRunState.Running);

        } finally {

            lock.unlock();

        }
        
        return this;
        
    }

    @Override
    @SuppressWarnings("rawtypes")
    public Class getServiceIface() {

        return ITransactionService.class;
        
    }
    
    private static transient final String[] EMPTY = new String[0];

    /**
     * Return the {@link CounterSet}.
     */
    public CounterSet getCounters() {
        
        final CounterSet countersRoot = new CounterSet();

        countersRoot.addCounter("runState", new Instrument<String>() {
            @Override
            protected void sample() {
                setValue(runState.toString());
            }
        });

        countersRoot.addCounter("#active", new Instrument<Integer>() {
            @Override
            protected void sample() {
                setValue(getActiveCount());
            }
        });

        countersRoot.addCounter("lastCommitTime", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getLastCommitTime());
            }
        });

        countersRoot.addCounter("minReleaseAge", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getMinReleaseAge());
            }
        });

        countersRoot.addCounter("releaseTime", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getReleaseTime());
            }
        });

        countersRoot.addCounter("startCount", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getStartCount());
            }
        });

        countersRoot.addCounter("abortCount", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getAbortCount());
            }
        });

        countersRoot.addCounter("commitCount", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getCommitCount());
            }
        });

        countersRoot.addCounter("readOnlyActiveCount", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getReadOnlyActiveCount());
            }
        });

        countersRoot.addCounter("readWriteActiveCount", new Instrument<Long>() {
            @Override
            protected void sample() {
                setValue(getReadWriteActiveCount());
            }
        });

        /*
         * Reports the earliest transaction identifier -or- ZERO (0L) if there
         * are no active transactions.
         * 
         * Note: This is a txId. It is NOT the commitTime on which that tx is
         * reading.
         */
        countersRoot.addCounter("earliestReadsOnCommitTime",
                new Instrument<Long>() {
                    @Override
                    protected void sample() {
                        final TxState tmp = earliestOpenTx;
                        if (tmp != null)
                            setValue(tmp.readsOnCommitTime);
                    }
                });

        return countersRoot;
        
    }


}
